package org.tio.showcase.websocket.server.processor;

import cn.hutool.core.convert.impl.UUIDConverter;
import cn.hutool.core.io.resource.ResourceUtil;
import com.alibaba.fastjson.JSON;
import org.redisson.Redisson;
import org.redisson.api.RBucket;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.showcase.websocket.server.Const;
import org.tio.showcase.websocket.server.pojo.Msg;
import org.tio.showcase.websocket.server.pojo.User;
import org.tio.showcase.websocket.server.util.MsgUtil;
import org.tio.utils.Uuid;
import org.tio.websocket.common.WsRequest;

import java.io.IOException;
import java.util.UUID;

/**
 * 基于redis的发布订阅做的处理单元
 */
public class ServerProcessorOnPubSub implements ServerProcessor {

    private static Logger log = LoggerFactory.getLogger(ServerProcessorOnPubSub.class);
    private RedissonClient client;
    private RTopic<Msg> topic;

    public ServerProcessorOnPubSub() {
        try {
            Config config = Config.fromJSON(ResourceUtil.getStream("classpath:redisson.json"));
            client = Redisson.create(config);
            topic = client.getTopic(Const.WS_MSG_TOPIC_CHANNEL);
            subcribeMsg();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
        String username = channelContext.getUserid();
        //TODO 如查询当前用户所在组的功能
        //Set<String> groups = userService.getUserGroups(username);
        // for 循环 ：Aio.bindGroup(channelContext, group);
        //不管之前是否已经登录，直接覆盖，实际业务时会有具体处理
        User user = new User();
        // user.setGroup(groups);
        user.setUsername(username);
        user.setNode(channelContext.getServerNode().toString());
        RBucket<User> userRBucket = client.getBucket(Const.WS_USER_PREFIX + username);
        userRBucket.set(user);
        log.info("用户{}加入", username);
    }

    @Override
    public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception {
        String username = channelContext.getUserid();
        client.getBucket(Const.WS_USER_PREFIX + username).delete();
        log.info("用户{}离开", username);
    }

    @Override
    public Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception {
        Msg msg = JSON.parseObject(text, Msg.class);
        //忽略心跳请求
        if (Const.Action.HEART_BEAT_REQ.val() != msg.getAction()) {
            msg.setId(UUID.randomUUID().toString().replace("-",""));
            topic.publish(msg);
        }
        return null;
    }

    private void subcribeMsg() {
        topic.addListener(new MessageListener<Msg>() {
            @Override
            public void onMessage(String channel, Msg msg) {
                int action = msg.getAction();
                Msg respMsg = new Msg();
                //响应信息则直接返回给客户端即可
                if (action % 11 == 0 && MsgUtil.existsUser(msg.getTo())) {
                    //重新包装下后再发送
                    respMsg.setMsg(msg.getMsg());
                    respMsg.setAction(msg.getAction());
                    respMsg.setStatus(msg.getStatus());
                    MsgUtil.sendToUser(msg.getTo(), respMsg);
                } else {
                    respMsg.setTo(msg.getFrom());
                    respMsg.setStatus("200");
                    respMsg.setId(msg.getId());
                    if (action == Const.Action.P2P_MSG_REQ.val()) {
                        respMsg.setAction(Const.Action.P2P_MSG_RESP.val());
                        if (MsgUtil.existsUser(msg.getTo())) {
                            MsgUtil.sendToUser(msg.getTo(), msg);
                            topic.publish(respMsg);
                        }
                    } else if (action == Const.Action.GROUP_MSG_REQ.val()) {
                        MsgUtil.sendToGroup(msg.getTo(), msg);
                        respMsg.setAction(Const.Action.GROUP_MSG_RESP.val());
                        topic.publish(respMsg);
                    }
                }
            }
        });
    }
}
